-
Notifications
You must be signed in to change notification settings - Fork 1.7k
fix: UnnestExec preserves relevant equivalence properties of input #16985
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
7527f16 to
5838f45
Compare
|
Tagging @alamb, maybe you can trigger CI? 🙏🏻 |
5838f45 to
751a8ba
Compare
| /// For list unnesting, each rows is vertically transformed into multiple rows | ||
| /// For struct unnesting, each columns is horizontally transformed into multiple columns, | ||
| /// For list unnesting, each row is vertically transformed into multiple rows | ||
| /// For struct unnesting, each column is horizontally transformed into multiple columns, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Grammar fix
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
523eefd to
80567ec
Compare
|
I discovered I have updated the PR doing that. Equivalence properties are
I am pretty sure that this takes care of 1 and 2, since we now have no equivalence properties for the columns. I am not yet sure about 3, though - if the original expression uses a column that is a primary key, after the unnest we will have multiple rows with the same column. Does that mean we need to remove that constraint from the eq properties? It kinda sounds like yes, but I need to see exactly what it's being used for. |
80567ec to
a17ec47
Compare
After reading some more I have now updated it so that we remove any constraint from the properties. I've updated the PR description. I think this is semantically sound now. FYI @alamb and @asubiotto |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work! This LGTM, I'll leave it to @alamb for a final review and CI kickoff.
79ec7e6 to
f1e889d
Compare
0cf176b to
95cdb26
Compare
c23e4a9 to
4093afb
Compare
|
Since CI ran on this one, I'll leave it here without updating the branch until this gets reviewed again 👍🏻 |
|
@berkaysynnada @suremarc @alamb Gentle ping for a review! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To me, the changes and tests make sense. Thanks!
CAVEAT: I am by no means a DataFusion pro. Just trying to learn more while providing some feedback. :)
|
Maybe one additional note: I think the resulting sort properties can be improved for unnesting structs if we know that the struct columns themselves are ordered. If that makes sense we could also somehow expand the But as this is already an improvement I think that tracking this in a separate issue is fine. |
|
Thank you so much @tobixdev! |
Great idea! |
|
I took a look and it seems all good to me but given there's already been a lot of review on it I think the existing reviewers need to approve for it to be mergeable, so I will defer to them. Consider this my token ✅ |
4093afb to
c42c8c1
Compare
|
Thanks a lot everyone! @alamb ready for the stamp now ;) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for this contribution @vegarsti
I think this is very close. I think it should have:
- Some additional tests / comments cleanup (see comments)
- Avoid
unwrap/expectto minimize the severity of symptoms
| physical_plan | ||
| 01)ProjectionExec: expr=[array_agg(unnested.ar)@1 as array_agg(unnested.ar)] | ||
| 02)--AggregateExec: mode=FinalPartitioned, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted | ||
| 03)----SortExec: expr=[generated_id@0 ASC NULLS LAST], preserve_partitioning=[true] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this plan shows the data being sorted, but the comment suggests it should not be 🤔
Could you please explain in more detail what you expect this explain plan to be showing? Given there is no ORDER BY in the query (or in the OVER clause) it is not clear why this is testing ordering
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, this test is from your comment here #15231 (comment)
By "the comment" here do you mean "Unnest with ordering on unrelated column is preserved"?
The point of the test is to show that we don't lose the inherent ordering of generated_id (through row_number()) when we do unnest, so if we look at 11) BoundedWindowAggExec, we see mode=[Sorted] and then we also see ordering_mode=Sorted at 06) AggregateExec, and at 02) AggregateExec. I will include this in the comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait, sorry, I understand your comment now. You're saying it is being sorted at 03 here, and the comment is saying it shouldn't. I agree. Good catch. How did I make this regression happen? For sure it was not doing this sorting in a previous iteration of this PR...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You actually pointed this test case out here as well and I didn't fix it, I just added your new test case (which does pass, thankfully) #16985 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not able to understand. The change in this PR indeed adds SortExec step in 3, making the physical plan go from
01)ProjectionExec: expr=[array_agg(unnested.ar)@1 as array_agg(unnested.ar)]
02)--AggregateExec: mode=FinalPartitioned, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)]
03)----CoalesceBatchesExec: target_batch_size=8192
04)------RepartitionExec: partitioning=Hash([generated_id@0], 4), input_partitions=4
05)--------AggregateExec: mode=Partial, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)]
06)----------ProjectionExec: expr=[generated_id@0 as generated_id, __unnest_placeholder(make_array(range().value),depth=1)@1 as ar]
07)------------UnnestExec
08)--------------ProjectionExec: expr=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as generated_id, make_array(value@0) as __unnest_placeholder(make_array(range().value))]
09)----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
10)------------------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { name: "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted]
11)--------------------LazyMemoryExec: partitions=1, batch_generators=[range: start=1, end=5, batch_size=8192]
to
physical_plan
01)ProjectionExec: expr=[array_agg(unnested.ar)@1 as array_agg(unnested.ar)]
02)--AggregateExec: mode=FinalPartitioned, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted
03)----SortExec: expr=[generated_id@0 ASC NULLS LAST], preserve_partitioning=[true]
04)------CoalesceBatchesExec: target_batch_size=8192
05)--------RepartitionExec: partitioning=Hash([generated_id@0], 4), input_partitions=4
06)----------AggregateExec: mode=Partial, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted
07)------------ProjectionExec: expr=[generated_id@0 as generated_id, __unnest_placeholder(make_array(range().value),depth=1)@1 as ar]
08)--------------UnnestExec
09)----------------ProjectionExec: expr=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as generated_id, make_array(value@0) as __unnest_placeholder(make_array(range().value))]
10)------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
11)--------------------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { name: "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted]
12)----------------------LazyMemoryExec: partitions=1, batch_generators=[range: start=1, end=5, batch_size=8192]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like a regression, doesn't it? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I spent some more time looking and I think your code is working as expected
Namely, Note the ordering_mode=Sorted that is above the UnnestExec - that means that the GROUP BY columns (in this case generated_id) are sorted, as expected.
06)----------AggregateExec: mode=Partial, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted
07)------------ProjectionExec: expr=[generated_id@0 as generated_id, __unnest_placeholder(make_array(range().value),depth=1)@1 as ar]
08)--------------UnnestExec
09)----------------ProjectionExec: expr=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as generated_id, make_array(value@0) as __unnest_placeholder(make_array(range().value))]
10)------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
11)--------------------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { name: "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted]
The reason there is a sort in the new plan is that the optimizer has decided to repartition the intermediate aggregate result (unrelated to this PR)
03)----SortExec: expr=[generated_id@0 ASC NULLS LAST], preserve_partitioning=[true]
04)------CoalesceBatchesExec: target_batch_size=8192
05)--------RepartitionExec: partitioning=Hash([generated_id@0], 4), input_partitions=4
06)----------AggregateExec: mode=Partial, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I see. That's a relief. Thank you.
| 3 400 | ||
| 1 400 | ||
|
|
||
| # Explain should not have a SortExec |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you also please add two additional tests:
- a negative test case here. order by the output of the unnest and verify that it is in fact sorted correctly
- A case with the ordering column as the first index (e.g. tuples like (
100, [3,2,1], 'a')and then order by 100
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1: Included in 2b9bece
2: I'm not quite sure I understand what you mean to test here. In de7a558 I added a table with three columns of the form 100, [3,2,1], 'a' and then do a query where we select the tuple of these columns and unnest it. But in this case we discard it, since we (with the implementation in this PR) totally discard ordering when we unnest. I have a feeling this isn't what you meant - let me know
|
You have marked a few conversations resolved but I don't see any new commits. Perhaps you have them locally? |
Sorry, that is confusing. Indeed. Pushing a commit tomorrow morning! |
Awesome -- thank you! I am sorry I just feel bad this one has been dragging out so long |
No worries! Commits pushed now. Thanks for the careful review! I understand why you're careful about this, because it appears this isn't quite ready, since as you point out in #16985 (comment), that test case now does a sort at the end, whereas on main it actually doesn't. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @vegarsti this now looks ready to go. I looked carefully at the plans and I think everything looks like it is working as expected
🚀
| physical_plan | ||
| 01)ProjectionExec: expr=[array_agg(unnested.ar)@1 as array_agg(unnested.ar)] | ||
| 02)--AggregateExec: mode=FinalPartitioned, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted | ||
| 03)----SortExec: expr=[generated_id@0 ASC NULLS LAST], preserve_partitioning=[true] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I spent some more time looking and I think your code is working as expected
Namely, Note the ordering_mode=Sorted that is above the UnnestExec - that means that the GROUP BY columns (in this case generated_id) are sorted, as expected.
06)----------AggregateExec: mode=Partial, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted
07)------------ProjectionExec: expr=[generated_id@0 as generated_id, __unnest_placeholder(make_array(range().value),depth=1)@1 as ar]
08)--------------UnnestExec
09)----------------ProjectionExec: expr=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as generated_id, make_array(value@0) as __unnest_placeholder(make_array(range().value))]
10)------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
11)--------------------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { name: "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted]
The reason there is a sort in the new plan is that the optimizer has decided to repartition the intermediate aggregate result (unrelated to this PR)
03)----SortExec: expr=[generated_id@0 ASC NULLS LAST], preserve_partitioning=[true]
04)------CoalesceBatchesExec: target_batch_size=8192
05)--------RepartitionExec: partitioning=Hash([generated_id@0], 4), input_partitions=4
06)----------AggregateExec: mode=Partial, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted
|
Thank you also @asubiotto and @tobixdev for the help reviewing |
|
Awesome! Thank you! |
Which issue does this PR close?
What changes are included in this PR?
UnnestExec'scompute_propertieswe now construct itsEquivalencePropertiesusing what we can from the input plan, so that we preserve sort ordering of unrelated columns (and avoid unnecessary sorts further up in the plan).Are these changes tested?
UnnestExecinunnest.sltAre there any user-facing changes?
No
Explanation
Given a struct or array value
col,unnest(col)takes the N elements ofcoland "spreads" these onto N rows, where all other columns in the statement are preserved. Said another way, when we unnest a column we are inserting a lateral cross-join against its elements, which by construction:E.g. (from
unnest.slt):datafusion/datafusion/sqllogictest/test_files/unnest.slt
Lines 699 to 712 in 6d9b76e
The
EquivalencePropertiesstruct has three types of properties:In this PR we construct the
UnnestExecnode'sEquivalencePropertiesby using the input plan's equivalence properties for the columns that are not transformed - except for table constraints, which we discard entirely. The reasoning for discarding constraints is that because we're duplicating the other columns across rows, we are invalidating any uniqueness or primary-key constraint. We also need to some twiddling with the mapping of the projection (indices change due to the unnesting).